//file created by zsy on 2017.09.16
// the component depens on lsv bitmap and its snapshots implementation.
//supports ow2 only, therfor use volume format to destingush how to write bitmap unit, on lsv its page level bitmap while row2 is chunk level.

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/types.h>
#include <assert.h>

#include "stack.h"
#include "list.h"
#include "lsv_volume.h"
#include "lsv_bitmap.h"
#include "lsv_bitmap_snap.h"
#include "lsv_bitmap_internal.h"
#include "lsv_conf.h"
#include "lsv_help.h"
#include "lsv_log.h"
#include "lsv_gc.h"
#include "row2_bitmap.h"

#define OFFSET_TO_INDEX_BY_CHUNK(x)        (( (x) / BITMAP_CHUNK_SIZE) / ( BITMAP_CHUNK_SIZE /sizeof(struct row2_bitmap_unit)) )
#define OFFSET_TO_RNDEX_BY_CHUNK(x)        (( (x) / BITMAP_CHUNK_SIZE) % ( BITMAP_CHUNK_SIZE /sizeof(struct row2_bitmap_unit)) )

int private row_access_bitmap_entry(struct lsv_bitmap_context *bitmap_context, uint32_t vvol_id, uint64_t chunk_id, uint32_t chunk_off,
        chunked_bitmap_accessor_t * accessor)
{
        uint32_t len = sizeof(struct row2_bitmap_unit);
        uint64_t off_new = chunk_off;
        uint32_t len_new = len;

        assert(chunk_off + len <= LSV_CHUNK_SIZE);

        if(chunk_off % LSV_PAGE_SIZE)
        {
                off_new = chunk_off - (chunk_off % LSV_PAGE_SIZE);
                len_new += chunk_off - off_new;
        }

        if(len_new % LSV_PAGE_SIZE != 0)
                len_new = len_new + (LSV_PAGE_SIZE - (len_new % LSV_PAGE_SIZE));

        lsv_bitmap_cache_unit_t *cache_ref;
        uint8_t *cache_buf = lsv_bitmap_cache_get(bitmap_context, vvol_id, chunk_id, off_new, len_new, 0, &cache_ref);
        //assert(cache_buf);
        if(!cache_buf)
                return errno;

        assert(cache_ref->chunk_id == chunk_id);

        accessor->bitmap = (struct row2_bitmap_unit *)(cache_buf + chunk_off);
        accessor->lock_ctx = cache_ref;
        accessor->offset = off_new;
        accessor->len = len_new;

        //lsv_bitmap_cache_page_unlock(cache_ref, off_new / PAGE_SIZE, len_new / PAGE_SIZE);
        //lsv_bitmap_cache_unlock(cache_ref);

        return 0;
}

void row_access_end(chunked_bitmap_accessor_t * accessor)
{
        if(!accessor->bitmap)
                return; //empty.

        lsv_bitmap_cache_page_unlock(accessor->lock_ctx, accessor->offset / PAGE_SIZE, accessor->len / PAGE_SIZE);
        lsv_bitmap_cache_unlock(accessor->lock_ctx);
}

//read one entry
int private row_bitmap_chunked_access_node(struct lsv_bitmap_context *bitmap_context, uint64_t off, chunked_bitmap_accessor_t * accessor)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        uint32_t vvol_id;

        int ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX_BY_CHUNK(off);
        chunk_off = OFFSET_TO_RNDEX_BY_CHUNK(off) * sizeof(struct row2_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        vvol_id = pheader->chunk_map[chunk_index].vvol_id;

        DINFO("bitmap_read>off:%llu,chunk_id:%u,chunk_off:%u\n", (LLU)off,chunk_id,chunk_off);
        if(chunk_id == 0) //no bitmap allocated.
                accessor->bitmap = NULL;
                //memset(bitmap_buf, 0, sizeof(struct row2_bitmap_unit));
        else
        {
                ret = row_access_bitmap_entry(bitmap_context, vvol_id, chunk_id, chunk_off, accessor);
                if(pheader->chunk_map[chunk_index].is_ref)
                {
                        accessor->bitmap->owner = 0;  //important.
                }
        }

        if(ret)
        {

                //fatal error.
                return ret;
        }

        return ret;
}

int row_bitmap_chunked_access(void *volume_context, uint64_t off, chunked_bitmap_accessor_t * accessor)
{
        int ret;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = row_bitmap_chunked_access_node(node, off, accessor);

        lsv_bitmap_unlock(volume_context);

        return ret;
}

//read one entry
int private row_bitmap_chunked_read_node(struct lsv_bitmap_context *bitmap_context, uint64_t off, struct row2_bitmap_unit *bitmap_buf)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        uint32_t vvol_id;

        int ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX_BY_CHUNK(off);
        chunk_off = OFFSET_TO_RNDEX_BY_CHUNK(off) * sizeof(struct row2_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        vvol_id = pheader->chunk_map[chunk_index].vvol_id;

        DINFO("bitmap_read>off:%llu,chunk_id:%u,chunk_off:%u\n", (LLU)off,chunk_id,chunk_off);
        if(chunk_id == 0) //no bitmap allocated.
                memset(bitmap_buf, 0, sizeof(struct row2_bitmap_unit));
        else
        {
                ret = lsv_read_bitmap_node(bitmap_context, vvol_id, chunk_id, chunk_off, sizeof(struct row2_bitmap_unit), bitmap_buf);
                if(pheader->chunk_map[chunk_index].is_ref)
                {
                        bitmap_buf->owner = 0;  //important.
                }
        }

        if(ret)
        {
                //fatal error.
                return ret;
        }

        #if LSV_BITMAP_CHECK_BITMAP_LBA
        assert(bitmap_buf->lba == 0 || bitmap_buf->lba == off);
        #endif

        return ret;
}

int row_bitmap_chunked_read(void *volume_context, uint64_t off, struct row2_bitmap_unit *bitmap_buf)
{
        int ret;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = row_bitmap_chunked_read_node(node, off, bitmap_buf);

        lsv_bitmap_unlock(volume_context);

        return ret;
}


int private row_bitmap_chunked_write_node(void *volume_context, struct lsv_bitmap_context *bitmap_context,
        uint64_t off, struct row2_bitmap_unit *bitmap_buf, int flags)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        int     header_changed = 0;
        lsv_volume_proto_t *lsv_info = volume_context;

        (void) flags;
        chunk_index = OFFSET_TO_INDEX_BY_CHUNK(off);
        chunk_off = OFFSET_TO_RNDEX_BY_CHUNK(off) * sizeof(struct row2_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        uint8_t *cache_buf, *new_cache_buf;

        #if LSV_BITMAP_CHECK_BITMAP_LBA
        bitmap_buf->lba = off;
        #endif

        if(chunk_id == 0) //no bitmap allocated.
        {
                lsv_info->row2_stat.meta_malloc++;

                lsv_bitmap_cache_unit_t *ref = NULL;
                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &chunk_id, 1);
                if(ret)
                {
                        //fatal error.
                        return ret;
                }

                cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, 0, 0, 0, 1, &ref);
                if(!cache_buf)
                        return errno;

                pheader->chunk_map[chunk_index].is_ref = 0;
                pheader->chunk_map[chunk_index].chunk_id = chunk_id;
                pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                header_changed = 1;

                ref->chunk_id = chunk_id;
                lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                lsv_bitmap_cache_unlock(ref);

                DINFO("lsv bitmap cache set chunk_id=%u\r\n",chunk_id);
        }
        else if(pheader->chunk_map[chunk_index].is_ref) //snapshot.
        {
                lsv_info->row2_stat.meta_malloc++;

                lsv_bitmap_cache_unit_t *ref, *new_ref;
                uint32_t new_chunk_id;
                struct lsv_bitmap_context *root = lsv_bitmap_get_root(bitmap_context);
                /*be carefull to modify this part of code, as I/O function calling maybe yield out of the entry, and probably cause memory inconsistency after return back, I/O function must be considered as an aysnc operation.*/
                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 0);   //chunk is replaced, important, may yield.
                if(ret)
                {
                        //fatal error.
                        return ret;
                }

                //BITMAP_CHCHE_ID(bitmap_context) = chunk_id; //id changed, now cheat to lower level write to the new chunk...

                cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, chunk_id, 0, 0, 1, &ref);
                if(!cache_buf)
                        return errno;
                //assert(cache_buf);

                assert(ref->chunk_id == chunk_id);

                /*if(pheader->chunk_map[chunk_index].vvol_id && pheader->chunk_map[chunk_index].vvol_id != lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id)
                {
                uint64_t vol_id = 0;
                lsv_bitmap_vvol_to_vol(bitmap_context->volume_context, pheader->chunk_map[chunk_index].vvol_id, &vol_id);

                DINFO("volume_proto_remote_read_chunk vold_id=%lld\r\n", (LLU)vol_id);

                ret = volume_proto_remote_read_chunk(vol_id, chunk_id, 0, CHUNK_SIZE, cache_buf);

                if(ret)
                {
                DINFO("volume_proto_remote_read_chunk error %d\r\n", ret);
                //fatal error.

                lsv_bitmap_free_chunk(bitmap_context->volume_context, chunk_id);
                return ret;
                }

                DINFO("volume_proto_remote_read_chunk good\r\n");
                }*/

                new_cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->vvol_id, 0, 0, 0, 1, &new_ref);  //on new vol.
                if(!new_cache_buf)
                        return errno;

                assert(new_cache_buf);
                assert(cache_buf);
                assert(ref->chunk_id == chunk_id);

                if(pheader->chunk_map[chunk_index].is_ref) //others already did.
                {
                        new_ref->chunk_id = new_chunk_id; //id changed, no need write here, just cheat to lower level write to the new chunk...
                        memcpy(new_cache_buf, cache_buf, CHUNK_SIZE);

                        pheader->chunk_map[chunk_index].is_ref = 0;
                        pheader->chunk_map[chunk_index].chunk_id = chunk_id = new_chunk_id;
                        pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                        lsv_bitmap_cache_mark_dirty(new_ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);
                        header_changed = 1;

                        if(root->data_cow_callback)
                                root->data_cow_callback(new_cache_buf);
                        else
                                assert(0);
                }
                else
                        DINFO("lsv cow yeild happen, chunk_id=%u.\r\n", chunk_id);

                lsv_bitmap_cache_unlock(ref);
                lsv_bitmap_cache_unlock(new_ref);
        }

        //if not set then set current volume id.
        //if(bitmap_buf->vvol_id == 0)
        //        bitmap_buf->vvol_id = bitmap_context->bitmap_header->vvol_id;

        //no gc in chunk bitmap.
        ret = lsv_write_bitmap_node(volume_context, bitmap_context, chunk_id, chunk_off, sizeof(struct row2_bitmap_unit),
        bitmap_buf, LSV_FEATURE_BITMAP_GC_VALUE_UP);
        if(ret)
        {
                //fatal error.
                return ret;
        }

        if(header_changed)
        {
                lsv_info->row2_stat.head_write++;
                return bitmap_header_save(bitmap_context, SAVE_HEADER_LEVEL_ALL);
        }
        else
                return ret;
}

int row_bitmap_chunked_write(void *volume_context, uint64_t off, struct row2_bitmap_unit *bitmap_buf)
{
        int ret;
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = row_bitmap_chunked_write_node(volume_context, node, off, bitmap_buf, 0);

        lsv_bitmap_unlock(volume_context);

        if(! (root->flags & LSV_BITMAP_FLAG_DISABLE_GC))
                lsv_gc_valueup_commit(volume_context);

        return ret;
}

int private row_bitmap_chunked_will_cow_node(struct lsv_bitmap_context *bitmap_context, uint64_t off)
{
        uint64_t chunk_index;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX_BY_CHUNK(off);

        if(pheader->chunk_map[chunk_index].chunk_id == 0 || pheader->chunk_map[chunk_index].is_ref) //snapshot.
        {
                return 1;
        }

        return 0;
}

int row_bitmap_chunked_will_cow(void *volume_context, uint64_t off, uint32_t length)
{
        int ret = 0;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        // lsv_bitmap_rlock(volume_context);

        for(int i=0;i<length / LSV_PAGE_SIZE;i++)
                ret |= row_bitmap_chunked_will_cow_node(node, off + i * LSV_PAGE_SIZE);

        // lsv_bitmap_unlock(volume_context);

        return ret;
}
